在确定了技术选型和技术架构之后,打算先从自己拿手部分解决核心的url to mp3的下载过程,准备使用odoo16开发一个urltomp3的模块,核心功能大概步骤:
- 用户输入youtube url链接
- 进入到urltomp3.com输出并点击下载
- 将url传输到核心下载接口
- 下载接口进入数据库进行比对,鉴权
- 调用开启线程使用yt_dlp进行下载
- 前端循环访问生成的下载接口,当服务器下载完毕之时,前端则直接进行下载
ORM部分
class DownYoutubeRecord(models.Model):
_name = "urltomp3.down.youtube.record"
_description = 'youtube下载记录表'
name = fields.Char("视频名称")
video_sn = fields.Char("标识id")
@api.depends('video_sn')
def _compute_down_sn(self):
"""
自动计算下载标识
"""
timestamp = int(time.time() * 1000) # 当前时间戳(毫秒级)
self.down_sn = "{}{}".format(self.video_sn, timestamp)
down_sn = fields.Char("下载标识", compute=_compute_down_sn, store=True)
down_url = fields.Char("下载url", required=True)
down_schedule = fields.Float("下载进度") # 后期可以通过这个参数进行实时更新功能
down_num = fields.Integer("下载次数")
down_type = fields.Selection([('0', '.mp3'), ('1', '.mp4')], default='0', string="下载格式")
state = fields.Selection([('0', '编辑中'), ('1', '下载中'),
('2', '已完成')], default='0', string="状态")
start_time = fields.Datetime("开始时间")
end_time = fields.Datetime("结束时间")
def down_audio(self):
"""
开始下载
"""
# 1. 检查地址是否是有效地址
url_pattern = r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([^&]+)"
rec = re.match(url_pattern, self.down_url)
if not rec:
raise exceptions.ValidationError("请检查下载url地址")
# 2. 启动线程下载
kwargs = {
'record_id': self.id,
'down_url': self.down_url,
"down_sn": self.down_sn
}
down_youtube = DownYoutube(**kwargs)
threading.Thread(target=down_youtube.down_youtube_audio).start()
return self.down_sn
这里创建了down_audio的方法,用来处理接口下载请求
核心接口
class DownYoutubeController(http.Controller):
ip_access_counts = defaultdict(int)
ip_last_access = {}
def ip_test(self, max_access_count, time_window):
ip_address = request.httprequest.remote_addr
max_access_count = max_access_count # 允许的最大访问次数
time_window = time_window # 时间窗口为5秒最大访问为1 则是每5秒最大访问1次
current_time = int(time.time())
# 检查IP最后访问时间
if ip_address in self.ip_last_access:
last_access_time = self.ip_last_access[ip_address]
time_diff = current_time - last_access_time
# 如果时间差小于时间窗口,则检查访问次数
if time_diff < time_window:
if self.ip_access_counts[ip_address] >= max_access_count:
return True
# 如果时间差超过时间窗口,则重置访问计数
else:
self.ip_access_counts[ip_address] = 0
# 更新IP访问计数和最后访问时间
self.ip_access_counts[ip_address] += 1
self.ip_last_access[ip_address] = current_time
return False
@http.route('/api/ydl/mp3', type='http', auth='none', methods=['POST'], csrf=False, cors='*')
def down_youtube_mp3(self, **kwargs):
"""
下载youtube mp3
"""
if self.ip_test(1, 1):
return my_response(message="下次次数过多,请稍作等待后继续下载!", error_code=4003)
# 在这里编写处理接口请求的代码
down_url = kwargs.get("down_url")
if not down_url:
return my_response(message="youtube url地址不可以为空!", error_code=4000)
# 1. 检查地址是否是有效地址
url_pattern = r"(?:https?://)?(?:www\.)?youtube\.com/watch\?v=([^&]+)"
rec = re.match(url_pattern, down_url)
if not rec:
return my_response(message="请检查下载的youtube url地址是否正确!", error_code=4000)
# 记录一下video_sn同时查看文件是否存在,不存在则下载
video_sn = rec[1]
try:
youtube_instance = http.request.env['urltomp3.down.youtube.record'].sudo()
instance = youtube_instance.search([("video_sn", '=', video_sn)])
if not instance:
instance = youtube_instance.create({
"video_sn": video_sn,
"down_url": down_url
})
down_sn = instance.down_audio()
except Exception as e:
print(e)
return my_response(message="服务器内部错误!", error_code=5000)
# 返回响应
return my_response(message="添加下载成功!", response={
"down_sn": down_sn
})
@http.route('/api/download/mp3/<path:down_sn>', type='http', auth='public', methods=['GET'], csrf=False, cors='*')
def download_file(self, down_sn):
# 拼接文件路径
if self.ip_test(1, 1):
return my_response(message="下次次数过多,请稍作等待后继续下载!", error_code=4003)
file_path = './addons/urltomp3/down_path/'
youtube_instance = http.request.env['urltomp3.down.youtube.record'].sudo()
instance = youtube_instance.search([("down_sn", '=', down_sn)])
if not instance:
return my_response(message="非法操作!", error_code=4003)
down_file_path = os.path.join(file_path, down_sn + '.mp3')
if os.path.exists(down_file_path):
# 如果存在则调用下载
try:
# 读取文件内容
with open(down_file_path, 'rb') as file:
file_data = file.read()
# quote可以有效解决中文编码的问题
filename = instance.name + '.mp3'
encoded_filename = quote(filename)
# 构建文件下载的响应
headers = [
('Content-Disposition', 'attachment; filename=%s' % encoded_filename),
('Content-Type', 'application/octet-stream'),
('Content-Length', len(file_data))
]
# 构造返回视频结果
response = request.make_response(file_data, headers)
response.mimetype = 'audio/mp3'
return response
except Exception as e:
return str(e)
else:
# 等待下载中, 返回下载进度
return my_response(message="等待下载完毕!")
- ip_test:先简单的进行一个ip控制,防止接口被爬虫访问
- /api/ydl/mp3: 就是接口下载部分
- /api/download/mp3/path:down_sn: 这里就是接口的实时访问下载接口
核心功能
一开始想使用youtube-dl但是发现已经过时,所以用了基于youtube-dl开发的模块yt_dlp来进行了核心下载功能的开发
database_name = 'fandx_odoo16'
class DownYoutube(object):
def __init__(self, down_sn, record_id, down_url, down_path='./addons/urltomp3/down_path'):
self.down_url = down_url
self.down_path = down_path
self.down_sn = down_sn
self.record_id = record_id
self.is_init_data = False
def progress_hook(self, status):
if not self.is_init_data:
title = False
try:
title = status.get("info_dict").get("title")
except Exception as e:
print(e)
with registry(database_name).cursor() as cr:
# 初始化数据库数据
env = api.Environment(cr, SUPERUSER_ID, {})
now_instance = env['urltomp3.down.youtube.record'].browse(self.record_id)
now_instance.name = title
cr.commit()
self.is_init_data = True
def down_youtube_audio(self):
"""
开始下载 """ down_file_path = os.path.join(self.down_path, self.down_sn)
ydl_opts = {
'format': 'bestaudio/best',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'mp3',
'preferredquality': '192',
}],
'outtmpl': down_file_path,
'extractaudio': True,
'audioformat': 'mp3',
'audioquality': 192,
'progress_hooks': [self.progress_hook]
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([self.down_url])
总结
虽然完成了最小的功能,但是这个毕竟涉及到视频的下载,会非常的占用资源,所以接口的权限控制以及Ip控制和最大化队列控制都是非常重要的。目前都还没有进行完善,等所有功能都完善之后,在进行完善这些功能,减少服务器的负担。